package cn.edu360.beans

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object dmpwriteSparkCore {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(this.getClass.getSimpleName)
      // 设置序列化方式， [rdd] [worker]
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val url = "jdbc:mysql://192.168.254.5:3306/test?characterEncoding=utf-8"
    val table = "dmpwrite1"
    val props = new Properties()
    props.setProperty("user","root")
    props.setProperty("password","327652")
    props.setProperty("driver","com.mysql.jdbc.Driver")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._
    val parquet: DataFrame = sqlContext.read.parquet("d:/ParquetFile")
     val data: RDD[((String, String), Int)] = parquet.map(t => {

      val str: Array[String] = t.toString().split(",")
      val provincename: String = str(24)
      val cityname: String = str(25)
      ((provincename, cityname), 1)
    })
    val result: RDD[((String, String), Int)] = data.reduceByKey(_+_)
    val resFinal: DataFrame = result.map(t =>(t._2,t._1._1,t._1._2)).toDF("ct","provincename","cityname")
    //json写到本地
    resFinal.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/JsonFile1/")
    //写到jdbc
    resFinal.write.mode(SaveMode.Overwrite).jdbc(url, table, props)
    sc.stop()
  }
}
